// Copyright 2018 The Cockroach Authors.
// Copyright (c) 2022-present, Shanghai Yunxi Technology Co, Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This software (KWDB) is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
//          http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.

package norm

import (
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/cat"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/memo"
	"gitee.com/kwbasedb/kwbase/pkg/sql/opt/props/physical"
	"gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/pgcode"
	"gitee.com/kwbasedb/kwbase/pkg/sql/pgwire/pgerror"
	"gitee.com/kwbasedb/kwbase/pkg/sql/sem/tree"
	"gitee.com/kwbasedb/kwbase/pkg/sql/sqlbase"
	"gitee.com/kwbasedb/kwbase/pkg/sql/types"
	"gitee.com/kwbasedb/kwbase/pkg/util/errorutil"
	"gitee.com/kwbasedb/kwbase/pkg/util/log"
	"github.com/cockroachdb/errors"
)

// ReplaceFunc is the callback function passed to the Factory.Replace method.
// It is called with each child of the expression passed to Replace. See the
// Replace method for more details.
type ReplaceFunc func(e opt.Expr) opt.Expr

// MatchedRuleFunc defines the callback function for the NotifyOnMatchedRule
// event supported by the optimizer and factory. It is invoked each time an
// optimization rule (Normalize or Explore) has been matched. The name of the
// matched rule is passed as a parameter. If the function returns false, then
// the rule is not applied (i.e. skipped).
type MatchedRuleFunc func(ruleName opt.RuleName) bool

// AppliedRuleFunc defines the callback function for the NotifyOnAppliedRule
// event supported by the optimizer and factory. It is invoked each time an
// optimization rule (Normalize or Explore) has been applied.
//
// The function is called with the name of the rule and the expressions it
// affected. For a normalization rule, the source is always nil, and the target
// is the expression constructed by the replace pattern. For an exploration
// rule, the source is the expression matched by the rule, and the target is
// the first expression constructed by the replace pattern. If no expressions
// were constructed, it is nil. Additional expressions beyond the first can be
// accessed by following the NextExpr links on the target expression.
type AppliedRuleFunc func(ruleName opt.RuleName, source, target opt.Expr)

// Factory constructs a normalized expression tree within the memo. As each
// kind of expression is constructed by the factory, it transitively runs
// normalization transformations defined for that expression type. This may
// result in the construction of a different type of expression than what was
// requested. If, after normalization, the expression is already part of the
// memo, then construction is a no-op. Otherwise, a new memo group is created,
// with the normalized expression as its first and only expression.
//
// Factory is largely auto-generated by optgen. The generated code can be found
// in factory.og.go. The factory.go file contains helper functions that are
// invoked by normalization patterns. While most patterns are specified in the
// Optgen DSL, the factory always calls the `onConstruct` method as its last
// step, in order to allow any custom manual code to execute.
type Factory struct {
	evalCtx *tree.EvalContext

	// mem is the Memo data structure that the factory builds.
	mem *memo.Memo

	// funcs is the struct used to call all custom match and replace functions
	// used by the normalization rules. It wraps an unnamed xfunc.CustomFuncs,
	// so it provides a clean interface for calling functions from both the norm
	// and xfunc packages using the same prefix.
	funcs CustomFuncs

	// matchedRule is the callback function that is invoked each time a normalize
	// rule has been matched by the factory. It can be set via a call to the
	// NotifyOnMatchedRule method.
	matchedRule MatchedRuleFunc

	// appliedRule is the callback function which is invoked each time a normalize
	// rule has been applied by the factory. It can be set via a call to the
	// NotifyOnAppliedRule method.
	appliedRule AppliedRuleFunc

	// catalog is the opt catalog, used to resolve names during constant folding
	// of special metadata queries like 'table_name'::regclass.
	catalog cat.Catalog

	// TSWhiteListMap is the list that record which action can execute in ts engine.
	TSWhiteListMap *sqlbase.WhiteListMap

	// TSFlags record some flags used in TS query.
	TSFlags int

	// tsPushHelper check expr can push
	tsPushHelper memo.MetaInfoMap
}

// CheckFlag checks if the flag is set.
// flag: flag that need to be checked
func (f *Factory) CheckFlag(flag int) bool {
	return f.TSFlags&flag > 0
}

// Init initializes a Factory structure with a new, blank memo structure inside.
// This must be called before the factory can be used (or reused).
func (f *Factory) Init(evalCtx *tree.EvalContext, catalog cat.Catalog) {
	// Initialize (or reinitialize) the memo.
	if f.mem == nil {
		f.mem = &memo.Memo{}
	}
	f.mem.Init(evalCtx)

	f.evalCtx = evalCtx
	f.catalog = catalog
	f.funcs.Init(f)
	f.matchedRule = nil
	f.appliedRule = nil
	f.TSFlags = 0
	f.tsPushHelper = make(memo.MetaInfoMap, 0)
}

// DetachMemo extracts the memo from the optimizer, and then re-initializes the
// factory so that its reuse will not impact the detached memo. This method is
// used to extract a read-only memo during the PREPARE phase.
//
// Before extracting the memo, DetachMemo first clears all column statistics in
// the memo. This is used to free up the potentially large amount of memory
// used by histograms. This does not affect the quality of the plan used at
// execution time, since the stats are just recalculated anyway when
// placeholders are assigned. If there are no placeholders, there is no need
// for column statistics, since the memo is already fully optimized.
func (f *Factory) DetachMemo() *memo.Memo {
	m := f.mem
	f.mem = nil
	m.Detach()
	f.Init(f.evalCtx, nil /* catalog */)
	return m
}

// DisableOptimizations disables all transformation rules. The unaltered input
// expression tree becomes the output expression tree (because no transforms
// are applied).
func (f *Factory) DisableOptimizations() {
	f.NotifyOnMatchedRule(func(opt.RuleName) bool { return false })
}

// NotifyOnMatchedRule sets a callback function which is invoked each time a
// normalize rule has been matched by the factory. If matchedRule is nil, then
// no further notifications are sent, and all rules are applied by default. In
// addition, callers can invoke the DisableOptimizations convenience method to
// disable all rules.
func (f *Factory) NotifyOnMatchedRule(matchedRule MatchedRuleFunc) {
	f.matchedRule = matchedRule
}

// NotifyOnAppliedRule sets a callback function which is invoked each time a
// normalize rule has been applied by the factory. If appliedRule is nil, then
// no further notifications are sent.
func (f *Factory) NotifyOnAppliedRule(appliedRule AppliedRuleFunc) {
	f.appliedRule = appliedRule
}

// Memo returns the memo structure that the factory is operating upon.
func (f *Factory) Memo() *memo.Memo {
	return f.mem
}

// Metadata returns the query-specific metadata, which includes information
// about the columns and tables used in this particular query.
func (f *Factory) Metadata() *opt.Metadata {
	return f.mem.Metadata()
}

// CustomFuncs returns the set of custom functions used by normalization rules.
func (f *Factory) CustomFuncs() *CustomFuncs {
	return &f.funcs
}

// CopyAndReplace builds this factory's memo by constructing a copy of a subtree
// that is part of another memo. That memo's metadata is copied to this
// factory's memo so that tables and columns referenced by the copied memo can
// keep the same ids. The copied subtree becomes the root of the destination
// memo, having the given physical properties.
//
// The "replace" callback function allows the caller to override the default
// traversal and cloning behavior with custom logic. It is called for each node
// in the "from" subtree, and has the choice of constructing an arbitrary
// replacement node, or delegating to the default behavior by calling
// CopyAndReplaceDefault, which constructs a copy of the source operator using
// children returned by recursive calls to the replace callback. Note that if a
// non-leaf replacement node is constructed, its inputs must be copied using
// CopyAndReplaceDefault.
//
// Sample usage:
//
//	var replaceFn ReplaceFunc
//	replaceFn = func(e opt.Expr) opt.Expr {
//	  if e.Op() == opt.PlaceholderOp {
//	    return f.ConstructConst(evalPlaceholder(e))
//	  }
//
//	  // Copy e, calling replaceFn on its inputs recursively.
//	  return f.CopyAndReplaceDefault(e, replaceFn)
//	}
//
//	f.CopyAndReplace(from, fromProps, replaceFn)
//
// NOTE: Callers must take care to always create brand new copies of non-
// singleton source nodes rather than referencing existing nodes. The source
// memo should always be treated as immutable, and the destination memo must be
// completely independent of it once CopyAndReplace has completed.
func (f *Factory) CopyAndReplace(
	from memo.RelExpr, fromProps *physical.Required, replace ReplaceFunc,
) {
	if !f.mem.IsEmpty() {
		panic(errors.AssertionFailedf("destination memo must be empty"))
	}

	// Copy all metadata to the target memo so that referenced tables and columns
	// can keep the same ids they had in the "from" memo.
	f.mem.Metadata().CopyFrom(from.Memo().Metadata())

	// Perform copy and replacement, and store result as the root of this
	// factory's memo.
	to := f.invokeReplace(from, replace).(memo.RelExpr)
	f.Memo().SetRoot(to, fromProps)
}

// AssignPlaceholders is used just before execution of a prepared Memo. It makes
// a copy of the given memo, but with any placeholder values replaced by their
// assigned values. This can trigger additional normalization rules that can
// substantially rewrite the tree. Once all placeholders are assigned, the
// exploration phase can begin.
func (f *Factory) AssignPlaceholders(from *memo.Memo) (err error) {
	defer func() {
		if r := recover(); r != nil {
			// This code allows us to propagate errors without adding lots of checks
			// for `if err != nil` throughout the construction code. This is only
			// possible because the code does not update shared state and does not
			// manipulate locks.
			if ok, e := errorutil.ShouldCatch(r); ok {
				err = e
			} else {
				panic(r)
			}
		}
	}()

	// Copy the "from" memo to this memo, replacing any Placeholder operators as
	// the copy proceeds.
	var replaceFn ReplaceFunc
	replaceFn = func(e opt.Expr) opt.Expr {
		if placeholder, ok := e.(*memo.PlaceholderExpr); ok {
			d, err := e.(*memo.PlaceholderExpr).Value.Eval(f.evalCtx)
			if err != nil {
				panic(err)
			}
			return f.ConstructConstVal(d, placeholder.DataType())
		}

		// we should deal with IsConstForLogicPlan in prepare scenario
		dstExpr := f.CopyAndReplaceDefault(e, replaceFn)
		if scalar, ok := e.(opt.ScalarExpr); ok {
			if dstScalar, ok := dstExpr.(opt.ScalarExpr); ok {
				dstScalar.SetConstDeductionEnabled(scalar.CheckConstDeductionEnabled())
			}
		}
		return dstExpr
	}
	f.CopyAndReplace(from.RootExpr().(memo.RelExpr), from.RootProps(), replaceFn)

	return nil
}

// onConstructRelational is called as a final step by each factory method that
// constructs a relational expression, so that any custom manual pattern
// matching/replacement code can be run.
func (f *Factory) onConstructRelational(rel memo.RelExpr) memo.RelExpr {
	// [SimplifyZeroCardinalityGroup]
	// SimplifyZeroCardinalityGroup replaces a group with [0 - 0] cardinality
	// with an empty values expression. It is placed here because it depends on
	// the logical properties of the group in question.
	if rel.Op() != opt.ValuesOp {
		relational := rel.Relational()
		if relational.Cardinality.IsZero() && !relational.CanHaveSideEffects {
			if f.matchedRule == nil || f.matchedRule(opt.SimplifyZeroCardinalityGroup) {
				values := f.funcs.ConstructEmptyValues(relational.OutputCols)
				if f.appliedRule != nil {
					f.appliedRule(opt.SimplifyZeroCardinalityGroup, nil, values)
				}
				return values
			}
		}
	}

	return rel
}

// onConstructScalar is called as a final step by each factory method that
// constructs a scalar expression, so that any custom manual pattern matching/
// replacement code can be run.
func (f *Factory) onConstructScalar(scalar opt.ScalarExpr) opt.ScalarExpr {
	return scalar
}

// ----------------------------------------------------------------------
//
// Convenience construction methods.
//
// ----------------------------------------------------------------------

// ConstructZeroValues constructs a Values operator with zero rows and zero
// columns. It is used to create a dummy input for operators like CreateTable.
func (f *Factory) ConstructZeroValues() memo.RelExpr {
	return f.ConstructValues(memo.EmptyScalarListExpr, &memo.ValuesPrivate{
		Cols: opt.ColList{},
		ID:   f.Metadata().NextUniqueID(),
	})
}

// ConstructJoin constructs the join operator that corresponds to the given join
// operator type.
func (f *Factory) ConstructJoin(
	joinOp opt.Operator, left, right memo.RelExpr, on memo.FiltersExpr, private *memo.JoinPrivate,
) memo.RelExpr {
	switch joinOp {
	case opt.InnerJoinOp:
		return f.ConstructInnerJoin(left, right, on, private)
	case opt.InnerJoinApplyOp:
		return f.ConstructInnerJoinApply(left, right, on, private)
	case opt.LeftJoinOp:
		return f.ConstructLeftJoin(left, right, on, private)
	case opt.LeftJoinApplyOp:
		return f.ConstructLeftJoinApply(left, right, on, private)
	case opt.RightJoinOp:
		return f.ConstructRightJoin(left, right, on, private)
	case opt.FullJoinOp:
		return f.ConstructFullJoin(left, right, on, private)
	case opt.SemiJoinOp:
		return f.ConstructSemiJoin(left, right, on, private)
	case opt.SemiJoinApplyOp:
		return f.ConstructSemiJoinApply(left, right, on, private)
	case opt.AntiJoinOp:
		return f.ConstructAntiJoin(left, right, on, private)
	case opt.AntiJoinApplyOp:
		return f.ConstructAntiJoinApply(left, right, on, private)
	}
	panic(errors.AssertionFailedf("unexpected join operator: %v", log.Safe(joinOp)))
}

// ConstructConstVal constructs one of the constant value operators from the
// given datum value. While most constants are represented with Const, there are
// special-case operators for True, False, and Null, to make matching easier.
// Null operators require the static type to be specified, so that rewrites do
// not change it.
func (f *Factory) ConstructConstVal(d tree.Datum, t *types.T) opt.ScalarExpr {
	if d == tree.DNull {
		return f.ConstructNull(t)
	}
	if boolVal, ok := d.(*tree.DBool); ok {
		// Map True/False datums to True/False operator.
		if *boolVal {
			return memo.TrueSingleton
		}
		return memo.FalseSingleton
	}
	return f.ConstructConst(d, t)
}

// GetTableDescFromKObjectID retrieves the tableDescriptor by kobjectID.
func (f *Factory) GetTableDescFromKObjectID(kobjectID uint64) (*sqlbase.TableDescriptor, error) {
	tabDesc, err := sqlbase.GetTableDescFromID(f.evalCtx.Context, f.evalCtx.Txn, sqlbase.ID(kobjectID))
	if err != nil {
		return nil, err
	}

	return tabDesc, nil
}

// AddColumn adds column to tsPushHelper map, which is used to
// check whether the expr is supported on ts engine.
func (f *Factory) AddColumn(
	col opt.ColumnID, alias string, typ memo.ExprType, pos memo.ExprPos, hash uint32,
) {
	f.tsPushHelper[col] = memo.ExprInfo{Alias: alias, Type: typ, Pos: pos, Hash: hash}
}

// GetPushHelperValue get push helper value
func (f *Factory) GetPushHelperValue(srcMap *memo.MetaInfoMap) {
	if srcMap != nil && len(f.tsPushHelper) > 0 {
		*srcMap = make(memo.MetaInfoMap, len(f.tsPushHelper))
		for key, val := range f.tsPushHelper {
			(*srcMap)[key] = val
		}
	}
}

// TSSupports checks column is supporter by ts engine, column is a logical column , that may be expr
// check by column type(const/column/binary op/ compare op/ agg op) and
// column exists position(project list/where/group by/order by)
func (f *Factory) TSSupports(col opt.ColumnID, pos memo.ExprPos) bool {
	info, ok := f.tsPushHelper[col]
	if !ok {
		return false
	}

	// single column and const can push down anywhere
	if info.Type == memo.ExprTypCol || info.Type == memo.ExprTypConst {
		return true
	}

	// check from whitelist
	return f.TSWhiteListMap.CheckWhiteListParam(info.Hash, uint32(pos))
}

// copyPushFlag copy the push down flag from the original expression.
// only use in prepare mode.
// source is the original Expr, ret is the new Expr.
func copyPushFlag(source, ret memo.RelExpr) {
	if source.IsTSEngine() {
		ret.SetEngineTS()
	}
}

// checkGrouping checks if group cols can execute in ts engine.
// cols is the GroupingCols of (memo.GroupByExpr or memo.ScalarGroupByExpr or memo.DistinctOnExpr).
func (f *Factory) checkGrouping(cols opt.ColSet) bool {
	allPush := true
	cols.ForEach(func(colID opt.ColumnID) {
		push := f.TSSupports(colID, memo.ExprPosGroupBy)
		colMeta := f.Metadata().ColumnMeta(colID)
		if !push || colMeta.Type.Family() == types.BytesFamily {
			allPush = false
		}
	})

	return allPush
}

// checkGroupByExpr checks if memo.GroupByExpr can execute in ts engine.
// input is the child of (memo.GroupByExpr or memo.ScalarGroupByExpr or memo.DistinctOnExpr) of memo tree.
// aggs is the AggregationsExpr of (memo.GroupByExpr or memo.ScalarGroupByExpr or memo.DistinctOnExpr).
// gp is the GroupingPrivate of (memo.GroupByExpr or memo.ScalarGroupByExpr or memo.DistinctOnExpr).
func (f *Factory) checkGroupByExpr(
	input *memo.RelExpr, aggs *memo.AggregationsExpr, gp *memo.GroupingPrivate,
) (bool, bool, bool) {
	canMerge := true
	allPush := f.CheckWhiteListAndSetEngine(input)
	aggCanPush, isDistinct := false, false
	if allPush {
		// group cols, all can push
		allPush = f.checkGrouping(gp.GroupingCols)
		if allPush {
			for i := 0; i < len(*aggs); i++ {
				srcExpr := (*aggs)[i].Agg
				hashCode := memo.GetExprHash(srcExpr)
				f.mem.CheckHelper.OptHelper.ColsOfProjectAndAgg[(*aggs)[i].Col] = struct{}{}
				// check if child of agg can push.
				push := f.Memo().CheckChildExecInTS(srcExpr, hashCode)

				// check if agg itself can push.
				if !push || !f.Memo().GetWhiteList().CheckWhiteListParam(hashCode, memo.ExprPosProjList) {
					canMerge = false
					allPush = false
					break
				}
				f.AddColumn((*aggs)[i].Col, "", memo.ExprTypeAggOp, memo.ExprPosGroupBy, hashCode)
				var isDistinctTmp bool
				aggCanPush, isDistinctTmp = memo.CheckAggCanParallel((*aggs)[i].Agg)
				if isDistinctTmp {
					isDistinct = true
				}
				canMerge = push && aggCanPush
			}
		}

		return allPush, canMerge, isDistinct
	}

	for i := 0; i < len(*aggs); i++ {
		f.mem.CheckHelper.OptHelper.ColsOfProjectAndAgg[(*aggs)[i].Col] = struct{}{}
	}

	return false, false, isDistinct
}

// checkProjectExpr checks if memo.ProjectExpr can execute in ts engine.
// source is the memo.ProjectExpr of memo tree.
func (f *Factory) checkProjectExpr(source *memo.ProjectExpr) bool {
	allPush := f.CheckWhiteListAndSetEngine(&source.Input)
	if allPush {
		for _, proj := range source.Projections {
			if proj.Element.Op() != opt.VariableOp {
				f.mem.CheckHelper.OptHelper.ColsOfProjectAndAgg[proj.Col] = struct{}{}
			}
			// projection list every one is all leave or all delete
			push, hashcode := memo.CheckExprCanExecInTSEngine(proj.Element.(opt.Expr), memo.ExprPosProjList,
				f.TSWhiteListMap.CheckWhiteListParam, false, f.mem.CheckOnlyOnePTagValue())
			if !push {
				return false
			}
			f.AddColumn(proj.Col, "", memo.GetExprType(proj.Element), memo.ExprPosProjList, hashcode)
		}

		source.SetEngineTS()
		return true
	}

	for _, proj := range source.Projections {
		if proj.Element.Op() != opt.VariableOp {
			f.mem.CheckHelper.OptHelper.ColsOfProjectAndAgg[proj.Col] = struct{}{}
		}
	}

	return false
}

// checkSelectExpr checks if memo.SelectExpr can execute in ts engine.
// source is the memo.SelectExpr of memo tree.
func (f *Factory) checkSelectExpr(source *memo.SelectExpr) bool {
	childExecInTS := f.CheckWhiteListAndSetEngine(&source.Input)
	if childExecInTS {
		for _, filter := range source.Filters {
			if !memo.CheckFilterExprCanExecInTSEngine(filter.Condition, memo.ExprPosSelect,
				f.Memo().GetWhiteList().CheckWhiteListParam, f.mem.CheckOnlyOnePTagValue()) {
				return false
			}
		}

		source.SetEngineTS()
		return true
	}

	return false
}

// checkTSScanExpr deals with memo.TSScanExpr of memo tree.
func (f *Factory) checkTSScanExpr(source *memo.TSScanExpr) bool {
	source.Cols.ForEach(func(colID opt.ColumnID) {
		colMeta := f.Metadata().ColumnMeta(colID)
		f.AddColumn(colID, colMeta.Alias, memo.ExprTypCol, memo.ExprPosNone, 0)
	})

	source.SetEngineTS()

	return true
}

// checkLookupJoinExpr checks if memo.LookupJoinExpr can execute in ts engine.
// join can not execute in ts engine, so just check child of LookupJoinExpr and add synchronize Expr.
// source is the memo.LookupJoinExpr of memo tree.
func (f *Factory) checkLookupJoinExpr(source *memo.LookupJoinExpr) bool {
	f.CheckWhiteListAndSetEngine(&source.Input)
	return false
}

// checkJoinExpr checks if (InnerJoinExpr,SemiJoinExpr,MergeJoinExpr,LeftJoinApplyExpr,RightJoinExpr,InnerJoinApplyExpr,FullJoinExpr,LeftJoinExpr)
// can execute in ts engine. they can not execute in ts engine, so just check their child node and add synchronize Expr.
// source is the memo.**JoinExpr of memo tree.
func (f *Factory) checkJoinExpr(source memo.RelExpr) (push bool) {
	switch s := source.(type) {
	case *memo.InnerJoinExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.SemiJoinExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.MergeJoinExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.LeftJoinApplyExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.RightJoinExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.InnerJoinApplyExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.FullJoinExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	case *memo.LeftJoinExpr:
		push = f.checkChildOfJoinExpr(&s.Left, &s.Right)
	}
	return push
}

// checkChildOfJoinExpr checks if the child of the joinExpr can exec in ts engine.
func (f *Factory) checkChildOfJoinExpr(left, right *memo.RelExpr) bool {
	f.CheckWhiteListAndSetEngine(left)
	f.CheckWhiteListAndSetEngine(right)

	return false
}

// CheckWhiteListAndSetEngine checks if each expr of memo tree can execute in ts engine,
// and set the engine of expr to opt.EngineTS when it can execute in ts engine.
func (f *Factory) CheckWhiteListAndSetEngine(src *memo.RelExpr) bool {
	switch e := (*src).(type) {
	case *memo.TSScanExpr:
		return f.checkTSScanExpr(e)
	case *memo.SelectExpr:
		return f.checkSelectExpr(e)
	case *memo.ProjectExpr:
		return f.checkProjectExpr(e)
	case *memo.GroupByExpr:
		childCanPush, canMerge, isDistinct := f.checkGroupByExpr(&e.Input, &e.Aggregations, &e.GroupingPrivate)
		if childCanPush && canMerge {
			// distinct should not twice agg. single node can push distinct.
			if !isDistinct || f.Memo().CheckFlag(opt.SingleMode) {
				e.SetEngineTS()
			}
		}

		return false
	case *memo.ScalarGroupByExpr:
		childCanPush, canMerge, isDistinct := f.checkGroupByExpr(&e.Input, &e.Aggregations, &e.GroupingPrivate)
		if childCanPush && canMerge {
			// distinct should not twice agg. single node can push distinct.
			if !isDistinct || f.Memo().CheckFlag(opt.SingleMode) {
				e.SetEngineTS()
			}
		}
		return false
	case *memo.InnerJoinExpr, *memo.AntiJoinExpr, *memo.AntiJoinApplyExpr, *memo.SemiJoinExpr, *memo.SemiJoinApplyExpr, *memo.MergeJoinExpr,
		*memo.LeftJoinApplyExpr, *memo.LeftJoinExpr, *memo.RightJoinExpr, *memo.InnerJoinApplyExpr, *memo.FullJoinExpr:
		return f.checkJoinExpr(e)
	case *memo.LookupJoinExpr:
		return f.checkLookupJoinExpr(e)
	case *memo.DistinctOnExpr:
		f.checkGroupByExpr(&e.Input, &e.Aggregations, &e.GroupingPrivate)
		return false
	case *memo.LimitExpr:
		f.CheckWhiteListAndSetEngine(&e.Input)
		return false
	case *memo.ScanExpr:
		return false
	case *memo.OffsetExpr:
		f.CheckWhiteListAndSetEngine(&e.Input)
		return false
	case *memo.ValuesExpr:
		return false
	case *memo.Max1RowExpr:
		f.CheckWhiteListAndSetEngine(&e.Input)
		return false
	case *memo.OrdinalityExpr:
		f.CheckWhiteListAndSetEngine(&e.Input)
		return false
	case *memo.WithScanExpr:
		return false
	case *memo.UnionAllExpr, *memo.UnionExpr, *memo.IntersectExpr, *memo.IntersectAllExpr, *memo.ExceptAllExpr, *memo.ExceptExpr:
		return false
	case *memo.WindowExpr:
		f.CheckWhiteListAndSetEngine(&e.Input)
		return false

	default:
		panic(pgerror.New(pgcode.Warning, "push down is not support "+e.Op().String()))
	}
}
